Overview

  1. Create Elasticsearch Mappings
    1. Load data into Elasticsearch (see Enrich & Prepare MovieLens Dataset.ipynb)
  2. Load ratings data and run ALS
  3. Save ALS model factors to Elasticsearch
  4. Show similar items using Elasticsearch

1. Set up Elasticsearch mappings

References:


In [ ]:
from elasticsearch import Elasticsearch
es = Elasticsearch()

In [ ]:
create_index = {
    "settings": {
        "analysis": {
            "analyzer": {
                "payload_analyzer": {
                    "type": "custom",
                    "tokenizer":"whitespace",
                    "filter":"delimited_payload_filter"
                }
            }
        }
    },
    "mappings": {
        "ratings": {
          "properties": {
                "timestamp": {
                    "type": "date"
                },
                "userId": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "movieId": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "rating": {
                    "type": "double"
                }
            }  
        },
        "users": {
            "properties": {
                "name": {
                    "type": "string"
                },
                "@model": {
                    "properties": {
                        "factor": {
                            "type": "string",
                            "term_vector": "with_positions_offsets_payloads",
                            "analyzer" : "payload_analyzer"
                        },
                        "version": {
                            "type": "string",
                            "index": "not_analyzed"
                        }
                    }
                }
            }
        },
        "movies": {
            "properties": {
                "genres": {
                    "type": "string"
                },
                "original_language": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "image_url": {
                    "type": "string",
                    "index": "not_analyzed"         
                },
                "release_date": {
                    "type": "date"
                },
                "popularity": {
                    "type": "double"
                },
                "@model": {
                    "properties": {
                        "factor": {
                            "type": "string",
                            "term_vector": "with_positions_offsets_payloads",
                            "analyzer" : "payload_analyzer"
                        },
                        "version": {
                            "type": "string",
                            "index": "not_analyzed"
                        }
                    }
                }
            }
        }
    }
}
# create index with the settings & mappings above
es.indices.create(index="demo", body=create_index)

Load User, Movie and Ratings DataFrames from Elasticsearch

Show schemas


In [ ]:
user_df = sqlContext.read.format("es").load("demo/users")
user_df.printSchema()
user_df.select("userId", "name").show(5)

In [ ]:
movie_df = sqlContext.read.format("es").load("demo/movies")
movie_df.printSchema()
movie_df.select("movieId", "title", "genres", "release_date", "popularity").show(5)

In [ ]:
ratings_df = sqlContext.read.format("es").load("demo/ratings")
ratings_df.printSchema()
ratings_df.show(5)

2. Run ALS


In [ ]:
from pyspark.ml.recommendation import ALS
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", regParam=0.1, rank=20)
model = als.fit(ratings_df)
model.userFactors.show(5)
model.itemFactors.show(5)

3. Write ALS user and item factors to Elasticsearch

Utility functions for converting factor vectors


In [ ]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf, lit

def convert_vector(x):
    '''Convert a list or numpy array to delimited token filter format'''
    return " ".join(["%s|%s" % (i, v) for i, v in enumerate(x)])
def reverse_convert(s):
    '''Convert a delimited token filter format string back to list format'''
    return  [float(f.split("|")[1]) for f in s.split(" ")]
def vector_to_struct(x, version):
    '''Convert a vector to a SparkSQL Struct with string-format vector and version fields'''
    return (convert_vector(x), version)
vector_struct = udf(vector_to_struct, \
                    StructType([StructField("factor", StringType(), True), \
                                StructField("version", StringType(), True)]))

In [ ]:
# test out the vector conversion function
test_vec = model.userFactors.select("features").first().features
print test_vec
print
print convert_vector(test_vec)

Convert factor vectors to [factor, version] form and write to Elasticsearch


In [ ]:
ver = model.uid
movie_vectors = model.itemFactors.select("id", vector_struct("features", lit(ver)).alias("@model"))
movie_vectors.select("id", "@model.factor", "@model.version").show(5)
user_vectors = model.userFactors.select("id", vector_struct("features", lit(ver)).alias("@model"))
user_vectors.select("id", "@model.factor", "@model.version").show(5)

In [ ]:
# write data to ES, use:
# - "id" as the column to map to ES movie id
# - "update" write mode for ES
# - "append" write mode for Spark
movie_vectors.write.format("es") \
    .option("es.mapping.id", "id") \
    .option("es.write.operation", "update") \
    .save("demo/movies", mode="append")

In [ ]:
user_vectors.write.format("es") \
    .option("es.mapping.id", "id") \
    .option("es.write.operation", "update") \
    .save("demo/users", mode="append")

Check the data was written correctly


In [ ]:
es.search(index="demo", doc_type="movies", q="star wars force", size=1)

4. Recommend using Elasticsearch!


In [ ]:
from IPython.display import Image, HTML, display

def fn_query(query_vec, q="*", cosine=False):
    return {
    "query": {
        "function_score": {
            "query" : { 
                "query_string": {
                    "query": q
                }
            },
            "script_score": {
                "script": "payload_vector_score",
                "lang": "native",
                "params": {
                    "field": "@model.factor",
                    "vector": query_vec,
                    "cosine" : cosine
                }
            },
            "boost_mode": "replace"
        }
    }
}

def get_similar(the_id, q="*", num=10, index="demo", dt="movies"):
    response = es.get(index=index, doc_type=dt, id=the_id)
    src = response['_source']
    if '@model' in src and 'factor' in src['@model']:
        raw_vec = src['@model']['factor']
        # our script actually uses the list form for the query vector and handles conversion internally
        query_vec = reverse_convert(raw_vec)
        q = fn_query(query_vec, q=q, cosine=True)
        results = es.search(index, dt, body=q)
        hits = results['hits']['hits']
        return src, hits[1:num+1]

def display_similar(the_id, q="*", num=10, index="demo", dt="movies"):
    movie, recs = get_similar(the_id, q, num, index, dt)
    # display query
    q_im_url = movie['image_url']
    display(HTML("<h2>Get similar movies for:</h2>"))
    display(Image(q_im_url, width=200))
    display(HTML("<br>"))
    display(HTML("<h2>Similar movies:</h2>"))
    sim_html = "<table border=0><tr>"
    i = 0
    for rec in recs:
        r_im_url = rec['_source']['image_url']
        r_score = rec['_score']
        sim_html += "<td><img src=%s width=200></img></td><td>%2.3f</td>" % (r_im_url, r_score)
        i += 1
        if i % 5 == 0:
            sim_html += "</tr><tr>"
    sim_html += "</tr></table>"
    display(HTML(sim_html))

In [ ]:
display_similar(122886, num=5)

In [ ]:
display_similar(122886, num=5, q="title:(NOT trek)")

In [ ]:
display_similar(6377, num=5, q="genres:children AND release_date:[now-2y/y TO now]")